任务类
Class: Job
- Extends: Object
CloudPSS 任务类。
job.id
任务ID。
job.args
任务参数。
job.job_status
任务状态。
job.createTime
任务创建时间。
job.startTime
任务开始时间。
job.endTime
任务结束时间。
job.context
任务执行上下文信息。
job.input
任务执行输入流 ID,用于发送数据给当前任务。
job.output
任务执行输出流 ID,用于接收任务执行的输出结果。
job.result
任务结果,用于兼容旧版本的使用方法。
Job.fetch(id)
获取指定 id 的 job 信息,如果 id 不存在直接抛异常。
job = Job.fetch('job/1')
Job.fetchMany(name=None, cursor=[], pageSize=10)
获取用户有权限的 job 列表。
data = Job.fetchMany()
"""
返回的数据格式 {
'cursor': ['1705377106000'], # 当前返回数据游标
'total': 4893, # 任务总数
'count': 10, # 当前返 回数据量
'items':[
{'id': 'job/1', 'name': 'job1', 'description': 'job1'}
....
]
}
"""
# 查询下个游标后的数据
cursor = data['cursor']
data2 = Job.fetchMany(cursor=cursor)
# 模糊查询
data2 = Job.fetchMany(name='job1')
print('获取任务列表', data)
Job.create(revisionHash, job, config, name=None, rid="", policy=None, **kwargs)
- 静态方法
revisionHash
: String 项目版本的 hashjob
: Dict 计算方案config
: Dict 参数方案name
: String 任务名称rid
: String 任务的 ridpolicy
: Dict 仿真策略kwargs
: Dict 仿真参数- Returns: Job 返回一个运行实例
创建一个运行任务。
job = Job.create(revisionHash, job, config, name=None, rid="", policy=None, **kwargs)
Job.dump(job, file, format="yaml", compress="gzip")
- 静态方法
job
: Job 任务实例file
: String 保存文件路径format
: String 保存文件格式, 默认为 yaml 格式compress
: String 保存文件压缩格式,默认为 gzip 格式
下载 job。
Job.dump(job, file, format="yaml", compress="gzip")
Job.load(file, format="yaml")
加载 job。
Job.load(file, format="yaml")
job.read(receiver=None, **kwargs)
- 静态方法
receiver
:JobReceiver 接收者kwargs
: Dict 仿真参数- Returns: MessageStreamReceiver 输出流实例
接收当前运行实例的输出。
Job.read(receiver=None, dev=False, **kwargs)
job.write(sender=None, **kwargs)
- 静态方法
sender
: MessageStreamSender 发送者kwargs
: Dict 仿真参数- Returns: MessageStreamSender 输入流实例
发送为当前运行实例 输入。
Job.write(sender=None, dev=False, **kwargs)
job.view(viewType)
- 实例方法
viewType
: Result 结果类,分为:EMTView
电磁暂态结果PowerFlowView
潮流结果,IESView
综合能源结果IESLabSimulationView
综合能源建模仿真结果IESLabTypicalDayView
综合能源典型日结果
- Returns: Result 返回一个结果实例
获取当前运行实例的输出。
job = Job.fetch(id)
view = job.view(EMTView)
job.status()
- Returns: Number 返回任务状态
0
: 运行中1
: 运行完成2
: 运行失败
任务状态(旧版本兼容)。
import os
import time
import cloudpss
if __name__ == "__main__":
cloudpss.setToken(token) ## 输入 token
os.environ['CLOUDPSS_API_URL'] = 'https://cloudpss.net/'
job = cloudpss.Job.fetch(id)
st = time.time()
while not job.status():
time.sleep(0.1)
print('计算完成', time.time() - st)
job.abort(timeout=3)
timeout
: Number 超时 3 秒
中断当前运行实例。
job = Job.fetch(id)
job.abort(3)