Skip to content

Memo

Chinese character in Jupyter

import matplotlib
matplotlib.matplotlib_fname()
!mv simhei.ttf /home/pai/lib/python3.6/site-packages/matplotlib/mpl-data/fonts/ttf/
plt.rcParams['font.sans-serif']=['SimHei'] 
plt.rcParams['axes.unicode_minus']=False

Reference:
https://www.freeaihub.com/post/110531.html
https://stackoverflow.com/questions/42097053/matplotlib-cannot-find-basic-fonts

Jupyter notebook setting

# limit the number of rows that pandas dataframe prints
pd.set_option('display.max_rows', 100)

# ignore warning messages
import warnings
warnings.filterwarnings('ignore')

# improve figure resolution
%config InlineBackend.figure_format = "retina"

Python ODPS

Read table from ODPS with parallel processing

# configure environment
import odps
import sys; sys.path.append("/home/admin/workspace")
from utility import read_records, to_pandas

odps_config = {"access_id":  'access_id',
               "access_key": 'access_key',
               "project":    'cnalgo_dev',
               "endpoint":   'http://service-corp.odps.aliyun-inc.com/api'}

o = odps.ODPS(odps_config["access_id"], odps_config["access_key"], odps_config["project"], odps_config["endpoint"])
%%time
# read tables from ODPS
input_table = 'cnalgo_sta_repeat_customers_random_subsample_treat_control_union'
df = to_pandas(input_table, o, N_WORKERS=8)

Write table into ODPS

from utility import odps_writer

target_table = 'cnalgo_sta_repeat_customers_random_subsample_treat'
output_table = buyers_treat_sampled[['biz_buyer_id']]#.apply(lambda x: str(x.biz_buyer_id), axis=1)
odps_writer(output_table, o.get_table(target_table))
import pandas as pd
from odps.tunnel import TableTunnel
from multiprocessing import Pool

def read_records(tunnel, table, session_id, start, count, columns):
    local_session = tunnel.create_download_session(table.name, download_id=session_id)
    result = {}
    for col in range(len(columns)):
        result[columns[col]] = []
    with local_session.open_record_reader(start, count) as reader:
        for record in reader:
            for i in range(len(record.values)):
                result[columns[i]] += [record.values[i]]
    df = pd.DataFrame(result)
    return df

def to_pandas(table_name, o, N_WORKERS=3):
    import math
    table = o.get_table(table_name)
    tunnel = TableTunnel(o)
    download_session = tunnel.create_download_session(table.name)
    session_id = download_session.id
    pool = Pool(processes=N_WORKERS)
    chunks = []
    count = table.open_reader().count
    if count < N_WORKERS:
        N_WORKERS = 1
    chunk_count = math.floor(count / N_WORKERS)
    columns = table.schema.names
    for i in range(N_WORKERS):
        start_i = i * chunk_count
        count_i = (count - i * chunk_count) if (i + 1) == N_WORKERS else chunk_count
        chunks.append(pool.apply_async(read_records, (tunnel, table, session_id, start_i, count_i, columns)))
    pool.close()
    pool.join()
    df = pd.concat([c.get() for c in chunks])
    df = df.reset_index(drop=True)
    return df

def odps_writer(df, t):
    with t.open_writer() as writer:
        writer.write(df.values.tolist())