and some example from python http server side implementing deltalake db service:
@retry(Exception, tries=5, delay=1, backoff=2)
def retrieve_ongoing_game_data(self, username, game_version):
dt = DeltaTable(self.basecreatedb.usersdbpathforgame, storage_options=GameDB.so)
uid = str(mmh3.hash128(username))
ds = dt.to_pyarrow_dataset()
tables = []
condition = (dspy.field("userid") == uid) & (
dspy.field("cur_game_version") == game_version)
batch_iter = ds.to_batches(filter=condition, batch_size=1)
for batch in batch_iter:
tables.append(batch.to_pandas())
break
if len(tables) > 0:
return tables[0].to_dict('records')
else:
return None
#
def _register_ongoing_game(self, uid, gid, ts, game_version, websocket_url, is_creator=False):
dfg = pl.DataFrame(
{
'cur_game_version': game_version,
'userid': str(uid),
'accomplishments': None,
'ongoing_gameid': str(gid),
'last_joined_gameat': int(ts),
'ongoing_game_started': None,
'ongoing_game_finished': None,
'ongoing_game_is_creator': is_creator,
'ongoing_game_websocket_url': websocket_url,
'lastonline': int(ts),
},
schema={
"cur_game_version": pl.String,
"userid": pl.String,
"accomplishments": pl.Array(pl.String, 0),
"ongoing_gameid": pl.String,
"last_joined_gameat": pl.Datetime('ms', time_zone="UTC"),
"ongoing_game_started": pl.Boolean,
"ongoing_game_finished": pl.Boolean,
"ongoing_game_is_creator": pl.Boolean,
'ongoing_game_websocket_url': pl.String,
'lastonline': pl.Datetime('ms', time_zone="UTC"),
})
#dfg.show()
wop = (dfg.write_delta(
self.basecreatedb.usersdbpathforgame,
mode='merge',
delta_merge_options={
'predicate': 'source.userid = target.userid and source.cur_game_version = target.cur_game_version ',
'source_alias': 'source',
'target_alias': 'target',
},
storage_options=GameDB.so)
.when_matched_update(updates={
"ongoing_gameid": "source.ongoing_gameid",
"last_joined_gameat": "source.last_joined_gameat",
"ongoing_game_websocket_url": "source.ongoing_game_websocket_url",
"ongoing_game_is_creator": str(is_creator).lower()
})
.execute() #.when_matched_update_all()
)
if (wop["num_target_rows_updated"] <1):
return False
else:
return True
@retry(Exception, tries=5, delay=1, backoff=2)
def _register_ongoing_game_with_retry(self, uid, gid, ts, game_version, websocket_url, is_creator=False):
return self._register_ongoing_game( uid, gid, ts, game_version, websocket_url, is_creator)
def try_join_game(self, username, g):
try:
uid = str(mmh3.hash128(username))
df = pl.DataFrame(
{
"year": g["year"],
"month": g["month"],
"day": g["day"],
"is_joinable": g["is_joinable"],
"game_version":g["game_version"],
"remaining_capacity": g["remaining_capacity"],
"token": g["token"],
"userid": g["userid"],
"hour": g["hour"],
"minute":g["minute"],
"second":g["second"],
"gameid": g["gameid"],
"current_joined_gamers": None,
"createdat": None,
"startedat": None,
"finishedat":None,
"is_started": None,
"is_finished": None,
"websocket_url": None,
"allocated_capacity": None,
"capacity": None,
"won_team": None,
},
schema={
"year": pl.Int32,
"month": pl.Int32,
"day": pl.Int32,
"is_joinable": pl.Boolean,
"game_version": pl.String,
"remaining_capacity": pl.Int32,
"token": pl.String,
"hour": pl.Int32,
"minute":pl.Int32,
"second": pl.Int32,
"userid": pl.String,
"gameid": pl.String,
"current_joined_gamers": pl.Array(pl.String, 0),
"createdat": pl.Datetime('ms', time_zone="UTC"),
"startedat": pl.Datetime('ms', time_zone="UTC"),
"finishedat":pl.Datetime('ms', time_zone="UTC"),
"is_started": pl.Boolean,
"is_finished": pl.Boolean,
"websocket_url": pl.String,
"allocated_capacity": pl.Int32,
"capacity": pl.Int32,
"won_team": pl.String }
)
condition = " and ".join([ "source.{} = target.{}".format(i, i) for i in ["year", "month" , "day",
"is_joinable", "game_version", "remaining_capacity", "token",
"userid", "hour", "minute","second",
"gameid"]])
wop = (df.write_delta(
self.basecreatedb.gamedbpath,
mode='merge',
delta_merge_options={
'predicate': condition ,
'source_alias': 'source',
'target_alias': 'target',
},
storage_options=GameDB.so)
#.when_matched_update_all()
.when_matched_update(updates={
"is_joinable": str((g["remaining_capacity"]-1)>0).lower(),
"remaining_capacity": str(g["remaining_capacity"]-1),
"current_joined_gamers": "array_append(target.current_joined_gamers, '{}')".format(uid),
"allocated_capacity" : "target.allocated_capacity+1",
})
.execute() #.when_matched_update_all()
)
if (wop["num_target_rows_updated"] <1):
return False
ts = datetime.now(timezone.utc).timestamp() *1000
self._register_ongoing_game_with_retry(str(uid), g["gameid"], int(ts), g["game_version"], g["websocket_url"])
return True
except:
return False
Yorumlar
Yorum Gönder